iT邦幫忙

2023 iThome 鐵人賽

DAY 27
0

雖然已經到尾聲了,但我發現有個重要的 Airflow 元件沒有提到,所以必須來補這篇。

Airflow XCom 是一種在不同的 task 之間傳遞資料的機制,它的全名是 cross-communication。Airflow XCom 的觀念是,每個 task 都可以產生一個或多個 key-value pair 的資料,並將它們儲存到 metadata database 中。這些資料可以被其他的 task 讀取或使用,以實現 task 之間的資料交換或依賴。

Airflow XCom 的優點是,它可以讓 task 之間的邏輯更清晰,並減少重複的程式碼。Airflow XCom 的缺點是,它會增加 metadata database 的負擔,並可能影響 Airflow 的效能。因為每次傳遞資料時,都需要將資料序列化並儲存到資料庫中。這樣會增加資料庫的負擔,並且延長 task 的執行時間。

因此,建議只使用 Airflow XCom 傳遞少量且重要的資料,例如參數或狀態,或是某些路徑。

使用情境

task1 產生了一批資料,我們有三種選擇:

  1. 將後續的處理 (可能是型別轉換,或是載入資料庫) 寫在同一個 task 內。
  2. 將這批資料寫入某處,例如 HDFS, Kafka, local disk (不太建議), AWS S3 等等,並將路徑用 XCom 傳給下一個 task。
  3. 將整批資料寫到 XCom 傳輸。

選擇一

優點:沒有任何的資源浪費,所有資料都在記憶體內流轉,沒用到 XCom。

缺點:沒有妥善利用 Airflow 提供的 Task 功能,也沒辦法做 pipeline 的排程優化。

如果資料後續的轉換是複雜的,像是還要再跟其他資料 join 的話,不建議這樣做。但如果足夠單純,是可以考慮的。

選擇二

不建議寫到 local disk 是因為如果你的 Airflow 大到要用分散式架構,每個 operator 可能在不同的機器上執行,所以 local disk 的資料可能會讀取失敗。

至於其他儲存如 S3,通常你上傳之後都會得到一個路徑如 “xxx_example_path@S3”,則我們可以將這個路徑透過 XCom 傳給其他的 task,下一個 task 再去將資料載回來後處理。

def push_data(**kwargs):
    data = query_from_somewhere()
    path = load_to_s3(data) 
    # 這個函數會回傳S3路徑,Airflow 預設會將它存在 XCom 內   
    return path

def pull_data(**kwargs):
    # 這個函數會從 XCom 中取得資料
    path = kwargs['ti'].xcom_pull(task_ids='push_task')
    data = query_from_s3(path)
    do_something(path)
    

with DAG('xcom_example', start_date=datetime(2021, 1, 1), schedule_interval=None) as dag:
    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_data
    )

    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_data
    )

    push_task >> pull_task

優點:傳送路徑不太可能遇到資料過多的狀況,每個 task 也沒什麼相依性,就看資料是否存在。

缺點:有點脫褲子放屁,每次需要資料時都要去某個地方重新載入。如果資料真的很大,也不見得多省時省資源。

考慮網路頻寬跟流量,如果你的資料很大,Airflow 又不是在 AWS 或 GCP 上的話,可以考慮架一個 HDFS 來存放。如果在雲上,又願意捨得花流量費的話,那就用它們的倉儲吧。

選擇三

跟選擇二的程式類似,只是寫入的是資料本身。要注意如果是自訂 class,需要先實作序列化跟反序列化,才能直接將該物化寫入 XCom。

另外,依你的 metadata database 而定,有不同的傳輸上限

PostgreSQL: 1 GB

MySQL: 64KB

sqlite: 2 GB

如果你沒有任何額外設定,預設是 48KB。所以大量資料就會很容易炸掉。

優點:資料直接傳輸,最直覺。

缺點:大量資料有可能超過 database 上限,導致執行失敗


上一篇
Flink Service 與 jar 的關係 - Day26
下一篇
讓 Airflow 呼叫 Flink - Day28
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言